package org.nbict.iot.protocol.task;

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by songseven on 18/6/25.
 */
public class OutbreakDetector extends BaseFunction {

    public static final int THRESHOLD = 1000;

    @Override
    public void execute(TridentTuple tuple, TridentCollector
            collector) {
        String key = (String) tuple.getValue(0);
        Long count = (Long) tuple.getValue(1);

        if (count > THRESHOLD) {
            List values = new ArrayList<>();
            values.add("outbreak detected for [" + key + "]! => " + count);
            collector.emit(values);
        }
    }
}
